/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.solr.cloud;

import static java.util.Collections.singletonList;

import com.carrotsearch.randomizedtesting.generators.RandomStrings;
import io.opentelemetry.api.metrics.MeterProvider;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.SolrQuery;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.ZkTestServer.LimitViolationAction;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.metrics.SolrMetricManager;
import org.apache.solr.util.SolrMetricTestUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Test PeerSync when a node restarts and documents are indexed when node was down.
 *
 * <p>This test is modeled after SyncSliceTest
 */
public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase {

  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

  private boolean success = false;
  int docId = 0;

  List<CloudJettyRunner> nodesDown = new ArrayList<>();

  @Override
  public void distribTearDown() throws Exception {
    if (!success) {
      printLayoutOnTearDown = true;
    }
    System.clearProperty("distribUpdateSoTimeout");
    System.clearProperty("solr.directoryFactory");
    System.clearProperty("solr.ulog.numRecordsToKeep");
    System.clearProperty("tests.zk.violationReportAction");
    super.distribTearDown();
  }

  public PeerSyncReplicationTest() {
    super();
    sliceCount = 1;
    fixShardCount(3);
  }

  @Override
  protected String getCloudSolrConfig() {
    return "solrconfig-tlog.xml";
  }

  @Override
  public void distribSetUp() throws Exception {
    // set socket timeout small, so replica won't be put into LIR state when they restart
    System.setProperty("distribUpdateSoTimeout", "3000");
    // tlog gets deleted after node restarts if we use CachingDirectoryFactory.
    // make sure that tlog stays intact after we restart a node
    System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
    System.setProperty("solr.ulog.numRecordsToKeep", "1000");
    System.setProperty("tests.zk.violationReportAction", LimitViolationAction.IGNORE.toString());
    super.distribSetUp();
  }

  @Test
  // commented 2-Aug-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028")
  public void test() throws Exception {
    handle.clear();
    handle.put("timestamp", SKIPVAL);

    waitForThingsToLevelOut(30, TimeUnit.SECONDS);

    del("*:*");

    // index enough docs and commit to establish frame of reference for PeerSync
    for (int i = 0; i < 100; i++) {
      indexDoc(id, docId, i1, 50, tlong, 50, t1, "document number " + docId++);
    }
    commit();
    waitForThingsToLevelOut(30, TimeUnit.SECONDS);

    try {
      checkShardConsistency(false, true);

      long cloudClientDocs = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
      assertEquals(docId, cloudClientDocs);

      CloudJettyRunner initialLeaderJetty = shardToLeaderJetty.get("shard1");
      List<CloudJettyRunner> otherJetties = getOtherAvailableJetties(initialLeaderJetty);
      CloudJettyRunner neverLeader = otherJetties.get(otherJetties.size() - 1);
      otherJetties.remove(neverLeader);

      // first shutdown a node that will never be a leader
      forceNodeFailures(singletonList(neverLeader));

      // node failure and recovery via PeerSync
      log.info("Forcing PeerSync");
      CloudJettyRunner nodePeerSynced = forceNodeFailureAndDoPeerSync(true);

      // add a few more docs
      indexDoc(id, docId, i1, 50, tlong, 50, t1, "document number " + docId++);
      indexDoc(id, docId, i1, 50, tlong, 50, t1, "document number " + docId++);
      commit();

      cloudClientDocs = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
      assertEquals(docId, cloudClientDocs);

      // now shutdown all other nodes except for 'nodeShutDownForFailure'
      otherJetties.remove(nodePeerSynced);
      forceNodeFailures(otherJetties);
      waitForThingsToLevelOut(30, TimeUnit.SECONDS);
      checkShardConsistency(false, true);

      // now shutdown the original leader
      log.info("Now shutting down initial leader");
      forceNodeFailures(singletonList(initialLeaderJetty));
      log.info("Updating mappings from zk");
      waitForNewLeader(cloudClient, "shard1", initialLeaderJetty.info);
      updateMappingsFromZk(jettys, clients, true);
      assertEquals(
          "PeerSynced node did not become leader",
          nodePeerSynced,
          shardToLeaderJetty.get("shard1"));

      // bring up node that was down all along, and let it PeerSync from the node that was forced to
      // PeerSync
      bringUpDeadNodeAndEnsureNoReplication(neverLeader, false);
      waitTillNodesActive();

      checkShardConsistency(false, true);

      // bring back all the nodes including initial leader
      // (commented as reports Maximum concurrent create/delete watches above limit violation and
      // reports thread leaks)
      /*for(int i = 0 ; i < nodesDown.size(); i++) {
        bringUpDeadNodeAndEnsureNoReplication(shardToLeaderJetty.get("shard1"), neverLeader, false);
      }
      checkShardConsistency(false, true);*/

      // make sure leader has not changed after bringing initial leader back
      assertEquals(nodePeerSynced, shardToLeaderJetty.get("shard1"));

      // assert metrics
      SolrMetricManager manager = nodePeerSynced.jetty.getCoreContainer().getMetricManager();
      MeterProvider registry = null;
      for (String name : manager.registryNames()) {
        if (name.startsWith("solr.core.collection1")) {
          registry = manager.meterProvider(name);
          break;
        }
      }
      assertNotNull(registry);
      CoreContainer cc = nodePeerSynced.jetty.getCoreContainer();
      String coreName =
          cc.getAllCoreNames().stream()
              .filter(n -> n.contains(DEFAULT_TEST_COLLECTION_NAME))
              .findFirst()
              .orElseThrow(
                  () ->
                      new IllegalStateException(
                          "Couldn't find core for " + nodePeerSynced.coreNodeName));
      try (SolrCore core = cc.getCore(coreName)) {
        assertTrue(
            SolrMetricTestUtils.getHistogramDatapoint(
                    core,
                    "solr_core_sync_with_leader_time_milliseconds",
                    SolrMetricTestUtils.newCloudLabelsBuilder(core)
                        .label("category", "REPLICATION")
                        .build())
                .hasCount());
        assertNull(
            SolrMetricTestUtils.getCounterDatapoint(
                core,
                "solr_core_sync_with_leader_sync_errors",
                SolrMetricTestUtils.newCloudLabelsBuilder(core)
                    .label("category", "REPLICATION")
                    .build()));
      }
      success = true;
    } finally {
      System.clearProperty("solr.index.replication.fingerprint.enabled");
    }
  }

  class IndexInBackGround extends Thread {
    private int numDocs;
    private CloudJettyRunner runner;

    public IndexInBackGround(int numDocs, CloudJettyRunner nodeToBringUp) {
      super(getClassName());
      this.numDocs = numDocs;
      this.runner = nodeToBringUp;
    }

    @Override
    public void run() {
      try {
        // If we don't wait for cores get loaded, the leader may put this replica into LIR state
        waitForCoreLoading();
        for (int i = 0; i < numDocs; i++) {
          indexDoc(id, docId, i1, 50, tlong, 50, t1, "document number " + docId);
          docId++;
          // slow down adds, to get documents indexed while in PeerSync
          Thread.sleep(100);
        }
      } catch (Exception e) {
        log.error("Error indexing doc in background", e);
        // Throwing an error here will kill the thread
      }
    }

    private void waitForCoreLoading() throws InterruptedException {
      while (true) {
        if (runner.jetty.getCoreContainer() != null) {
          CoreContainer cc = runner.jetty.getCoreContainer();
          cc.waitForLoadingCoresToFinish(20000);
          break;
        }
        Thread.sleep(100);
      }
    }
  }

  private void forceNodeFailures(List<CloudJettyRunner> replicasToShutDown) throws Exception {
    for (CloudJettyRunner replicaToShutDown : replicasToShutDown) {
      replicaToShutDown.jetty.stop();
    }

    int totalDown = 0;

    Set<CloudJettyRunner> jetties = new HashSet<>();
    jetties.addAll(shardToJetty.get("shard1"));

    if (replicasToShutDown != null) {
      jetties.removeAll(replicasToShutDown);
      totalDown += replicasToShutDown.size();
    }

    jetties.removeAll(nodesDown);
    totalDown += nodesDown.size();

    assertEquals(getShardCount() - totalDown, jetties.size());

    nodesDown.addAll(replicasToShutDown);
  }

  private CloudJettyRunner forceNodeFailureAndDoPeerSync(boolean enableFingerprint)
      throws Exception {
    // kill non leader - new leader could have all the docs or be missing one
    CloudJettyRunner leaderJetty = shardToLeaderJetty.get("shard1");

    List<CloudJettyRunner> nonLeaderJetties = getOtherAvailableJetties(leaderJetty);
    CloudJettyRunner replicaToShutDown =
        nonLeaderJetties.get(random().nextInt(nonLeaderJetties.size())); // random non leader node

    forceNodeFailures(Arrays.asList(replicaToShutDown));

    // two docs need to be synced back when replica restarts
    indexDoc(id, docId, i1, 50, tlong, 50, t1, "document number " + docId++);
    indexDoc(id, docId, i1, 50, tlong, 50, t1, "document number " + docId++);
    commit();

    bringUpDeadNodeAndEnsureNoReplication(replicaToShutDown, enableFingerprint);

    return replicaToShutDown;
  }

  private void bringUpDeadNodeAndEnsureNoReplication(
      CloudJettyRunner nodeToBringUp, boolean enableFingerprint) throws Exception {
    // disable fingerprint check if needed
    System.setProperty(
        "solr.index.replication.fingerprint.enabled", String.valueOf(enableFingerprint));
    // we wait a little while, so socket between leader -> replica will be timeout
    Thread.sleep(3000);
    IndexInBackGround iib = new IndexInBackGround(50, nodeToBringUp);
    iib.start();

    // bring back dead node and ensure it recovers
    nodeToBringUp.jetty.start();

    nodesDown.remove(nodeToBringUp);

    waitTillNodesActive();
    waitForThingsToLevelOut(30, TimeUnit.SECONDS);

    Set<CloudJettyRunner> jetties = new HashSet<>();
    jetties.addAll(shardToJetty.get("shard1"));
    jetties.removeAll(nodesDown);
    assertEquals(getShardCount() - nodesDown.size(), jetties.size());

    waitForThingsToLevelOut(30, TimeUnit.SECONDS);

    iib.join();

    cloudClient.commit();

    checkShardConsistency(false, false);

    long cloudClientDocs = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
    assertEquals(docId, cloudClientDocs);

    // if there was no replication, we should not have replication.properties file
    String replicationProperties =
        nodeToBringUp.jetty.getSolrHome()
            + "/cores/"
            + DEFAULT_TEST_COLLECTION_NAME
            + "/data/replication.properties";
    assertTrue(
        "PeerSync failed. Had to fail back to replication",
        Files.notExists(Path.of(replicationProperties)));
  }

  private void waitTillNodesActive() throws Exception {
    ZkStateReader zkStateReader = ZkStateReader.from(cloudClient);
    zkStateReader.waitForState(
        "collection1",
        3,
        TimeUnit.MINUTES,
        (n, c) -> {
          Collection<String> nodesDownNames =
              nodesDown.stream().map(runner -> runner.coreNodeName).collect(Collectors.toList());
          Collection<Replica> replicas = c.getSlice("shard1").getReplicas();
          return replicas.stream()
              .filter(r -> !nodesDownNames.contains(r.getName()))
              .allMatch(r -> r.getState() == Replica.State.ACTIVE && n.contains(r.getNodeName()));
        });
  }

  private List<CloudJettyRunner> getOtherAvailableJetties(CloudJettyRunner leader) {
    List<CloudJettyRunner> candidates = new ArrayList<>();
    candidates.addAll(shardToJetty.get("shard1"));

    if (leader != null) {
      candidates.remove(leader);
    }

    candidates.removeAll(nodesDown);

    return candidates;
  }

  protected void indexDoc(Object... fields) throws IOException, SolrServerException {
    SolrInputDocument doc = new SolrInputDocument();

    addFields(doc, fields);
    addFields(
        doc,
        "rnd_s",
        RandomStrings.randomAsciiLettersOfLength(random(), random().nextInt(100) + 100));

    UpdateRequest ureq = new UpdateRequest();
    ureq.add(doc);
    ureq.process(cloudClient);
  }

  // skip the randoms - they can deadlock...
  @Override
  protected void indexr(Object... fields) throws Exception {
    SolrInputDocument doc = new SolrInputDocument();
    addFields(doc, fields);
    addFields(doc, "rnd_b", true);
    indexDoc(doc);
  }
}
